package com.tca.common.learning.webflux.reactor.sink;

import reactor.core.publisher.Flux;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author zhoua
 * @date 2022/1/8 15:34
 *
 * generate api
 *
 * 1.public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
 *
 * 2.public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
 *
 * 3.public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
 */
public class GenerateTest {

    public static void main(String[] args) {

        generateApi1();

        generateApi2();

        generateApi3();

    }

    /**
     * public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
     */
    private static void generateApi3() {
        final int total = 4;
        Flux.generate(
                () -> 0,
                (count, sink) -> {
                    sink.next(count);
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                    if (count == total) {
                        sink.complete();
                    }
                    return ++count;
                },
                // 在处理完成后最后调用
                System.out::println
        ).subscribe(System.out::println);
    }

    /**
     * public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
     * stateSupplier --> 初始化状态
     * generator --> 数据流产生
     */
    private static void generateApi2() {
        final int total = 4;
        Flux.generate(
                () -> 0,
                (count, sink) -> {
                    sink.next(count);
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                    if (count == total) {
                        sink.complete();
                    }
                    return ++count;
                }
        ).subscribe(System.out::println);
    }

    /**
     * public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
     */
    private static void generateApi1() {
        final AtomicInteger i = new AtomicInteger();
        final int total = 5;
        Flux.generate(synchronousSink -> {
            synchronousSink.next(i.getAndIncrement());
            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
            }
            if (i.get() == total) {
                synchronousSink.complete();
            }
        }).subscribe(System.out::println);
    }
}
